/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class TestServiceAM extends ServiceTestUtils{

  private static final Logger LOG =
      LoggerFactory.getLogger(TestServiceAM.class);

  private File basedir;
  YarnConfiguration conf = new YarnConfiguration();
  TestingCluster zkCluster;
  @Rule
  public ServiceTestUtils.ServiceFSWatcher rule =
      new ServiceTestUtils.ServiceFSWatcher();

  @Before
  public void setup() throws Exception {
    basedir = new File("target", "apps");
    if (basedir.exists()) {
      FileUtils.deleteDirectory(basedir);
    } else {
      basedir.mkdirs();
    }
    zkCluster = new TestingCluster(1);
    zkCluster.start();
    conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
    LOG.info("ZK cluster: {}", zkCluster.getConnectString());
  }

  @After
  public void tearDown() throws IOException {
    if (basedir != null) {
      FileUtils.deleteDirectory(basedir);
    }
    if (zkCluster != null) {
      zkCluster.stop();
    }
  }

  // Race condition YARN-7486
  // 1. Allocate 1 container to compa and wait it to be started
  // 2. Fail this container, and in the meanwhile allocate the 2nd container.
  // 3. The 2nd container should not be assigned to compa-0 instance, because
  //   the compa-0 instance is not stopped yet.
  // 4. check compa still has the instance in the pending list.
  @Test
  public void testContainerCompleted() throws TimeoutException,
      InterruptedException {
    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setVersion("v1");
    exampleApp.setName("testContainerCompleted");
    exampleApp.addComponent(createComponent("compa", 1, "pwd"));

    MockServiceAM am = new MockServiceAM(exampleApp);
    am.init(conf);
    am.start();

    ComponentInstance compa0 = am.getCompInstance("compa", "compa-0");
    // allocate a container
    am.feedContainerToComp(exampleApp, 1, "compa");
    am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED);

    LOG.info("Fail the container 1");
    // fail the container
    am.feedFailedContainerToComp(exampleApp, 1, "compa");

    // allocate the second container immediately, this container will not be
    // assigned to comp instance
    // because the instance is not yet added to the pending list.
    am.feedContainerToComp(exampleApp, 2, "compa");

    am.waitForCompInstanceState(compa0, ComponentInstanceState.INIT);
    // still 1 pending instance
    Assert.assertEquals(1,
        am.getComponent("compa").getPendingInstances().size());
    am.stop();
  }

  // Test to verify that the containers of previous attempt are not prematurely
  // released. These containers are sent by the RM to the AM in the
  // heartbeat response.
  @Test(timeout = 200000)
  public void testContainersFromPreviousAttemptsWithRMRestart()
      throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setVersion("v1");
    exampleApp.setName("testContainersRecovers");
    String comp1Name = "comp1";
    String comp1InstName = "comp1-0";

    org.apache.hadoop.yarn.service.api.records.Component compA =
        createComponent(comp1Name, 1, "sleep");
    exampleApp.addComponent(compA);

    MockServiceAM am = new MockServiceAM(exampleApp);
    ContainerId containerId = am.createContainerId(1);
    am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
    am.init(conf);
    am.start();

    ComponentInstance comp10 = am.getCompInstance(comp1Name, comp1InstName);
    am.feedRecoveredContainer(containerId, comp1Name);
    am.waitForCompInstanceState(comp10, ComponentInstanceState.STARTED);

    // 0 pending instance
    Assert.assertEquals(0,
        am.getComponent(comp1Name).getPendingInstances().size());

    GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
        .getContainerStatus() != null, 2000, 200000);

    Assert.assertEquals("container state",
        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
        am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
            .getState());
    am.stop();
  }

  // Test to verify that the containers of previous attempt are released and the
  // component instance is added to the pending queue when the recovery wait
  // time interval elapses.
  @Test(timeout = 200000)
  public void testContainersReleasedWhenExpired()
      throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setName("testContainersRecovers");
    exampleApp.setVersion("v1");
    String comp1Name = "comp1";
    String comp1InstName = "comp1-0";

    org.apache.hadoop.yarn.service.api.records.Component compA =
        createComponent(comp1Name, 1, "sleep");
    exampleApp.addComponent(compA);

    MockServiceAM am = new MockServiceAM(exampleApp);
    ContainerId containerId = am.createContainerId(1);
    am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
    conf.setLong(YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, 10);
    am.init(conf);
    am.start();
    Thread.sleep(100);
    GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState()
        .equals(ComponentState.FLEXING), 100, 2000);

    // 1 pending instance
    Assert.assertEquals(1, am.getComponent(comp1Name).getPendingInstances()
        .size());

    am.feedContainerToComp(exampleApp, 2, comp1Name);

    GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
        .getContainerStatus() != null, 2000, 200000);
    Assert.assertEquals("container state",
        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
        am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
            .getState());
  }

  // Test to verify that the AM doesn't wait for containers of a different app
  // even though it corresponds to the same service.
  @Test(timeout = 200000)
  public void testContainersFromDifferentApp()
      throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setName("testContainersFromDifferentApp");
    exampleApp.setVersion("v1");
    String comp1Name = "comp1";
    String comp1InstName = "comp1-0";

    org.apache.hadoop.yarn.service.api.records.Component compA =
        createComponent(comp1Name, 1, "sleep");
    exampleApp.addComponent(compA);

    MockServiceAM am = new MockServiceAM(exampleApp);
    ContainerId containerId = am.createContainerId(1);
    // saves the container in the registry
    am.feedRegistryComponent(containerId, comp1Name, comp1InstName);

    ApplicationId changedAppId = ApplicationId.newInstance(
        System.currentTimeMillis(), 2);
    exampleApp.setId(changedAppId.toString());
    am.init(conf);
    am.start();
    // 1 pending instance since the container in registry belongs to a different
    // app.
    Assert.assertEquals(1,
        am.getComponent(comp1Name).getPendingInstances().size());

    am.feedContainerToComp(exampleApp, 1, comp1Name);
    GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
        .getContainerStatus() != null, 2000, 200000);

    Assert.assertEquals("container state",
        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
        am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
            .getState());
    am.stop();
  }

  @Test
  public void testScheduleWithMultipleResourceTypes()
      throws TimeoutException, InterruptedException, IOException {
    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setName("testScheduleWithMultipleResourceTypes");
    exampleApp.setVersion("v1");

    List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
        ResourceUtils.getResourcesTypeInfo());
    // Add 3rd resource type.
    resourceTypeInfos.add(ResourceTypeInfo
        .newInstance("resource-1", "", ResourceTypes.COUNTABLE));
    // Reinitialize resource types
    ResourceUtils.reinitializeResources(resourceTypeInfos);

    Component serviceCompoent = createComponent("compa", 1, "pwd");
    serviceCompoent.getResource().setResourceInformations(ImmutableMap
        .of("resource-1", new ResourceInformation().value(3333L).unit("Gi")));
    exampleApp.addComponent(serviceCompoent);

    MockServiceAM am = new MockServiceAM(exampleApp);
    am.init(conf);
    am.start();

    ServiceScheduler serviceScheduler = am.context.scheduler;
    AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync =
        serviceScheduler.getAmRMClient();

    Collection<AMRMClient.ContainerRequest> rr =
        amrmClientAsync.getMatchingRequests(0);
    Assert.assertEquals(1, rr.size());

    org.apache.hadoop.yarn.api.records.Resource capability =
        rr.iterator().next().getCapability();
    Assert.assertEquals(3333L, capability.getResourceValue("resource-1"));
    Assert.assertEquals("Gi",
        capability.getResourceInformation("resource-1").getUnits());

    am.stop();
  }

  @Test
  public void testContainerCompletedEventProcessed() throws Exception {
    ServiceContext context = createServiceContext("abc");
    MockServiceScheduler scheduler = new MockServiceScheduler(context);
    scheduler.init(conf);
    ApplicationId appId = ApplicationId.newInstance(0, 0);
    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
        1);
    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0);
    ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1,
        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
        "successful", 0);
    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1);
    ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2,
        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
        "successful", 0);
    ComponentInstance instance = Mockito.mock(ComponentInstance.class);
    Mockito.doReturn("componentInstance").when(instance).getCompName();
    scheduler.addLiveCompInstance(containerId2, instance);
    List<ContainerStatus> statuses = new ArrayList<>();
    // First container instance will be null
    statuses.add(containerStatus1);
    // Second container instance is added
    scheduler.addLiveCompInstance(containerId2, instance);
    statuses.add(containerStatus2);
    scheduler.callbackHandler.onContainersCompleted(statuses);
    // For second container event should be dispatched.
    verify(scheduler.dispatcher, times(1)).getEventHandler();
    DefaultMetricsSystem.shutdown();
  }

  private ServiceContext createServiceContext(String name)
      throws Exception {
    Artifact artifact = new Artifact();
    artifact.setId("1");
    artifact.setType(Artifact.TypeEnum.TARBALL);
    Service serviceDef = ServiceTestUtils.createExampleApplication();
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    serviceDef.setId(applicationId.toString());
    serviceDef.setName(name);
    serviceDef.setState(ServiceState.STARTED);
    serviceDef.getComponents().forEach(component ->
        component.setArtifact(artifact));
    ServiceContext context = new MockRunningServiceContext(rule,
        serviceDef);
    context.scheduler.getDispatcher().setDrainEventsOnStop();
    context.scheduler.getDispatcher().start();
    return context;
  }

  class MockServiceScheduler extends ServiceScheduler {
    private AsyncDispatcher dispatcher;
    private AMRMClientCallback callbackHandler = new AMRMClientCallback();

    MockServiceScheduler(ServiceContext context) {
      super(context);
    }

    @Override
    protected AsyncDispatcher createAsyncDispatcher() {
      dispatcher = Mockito.mock(AsyncDispatcher.class);
      EventHandler<Event> handler = Mockito.mock(EventHandler.class);
      Mockito.doReturn(handler).when(dispatcher).getEventHandler();
      return dispatcher;
    }

    @Override
    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
      return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
    }

  }

  @Test
  public void testRecordTokensForContainers() throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setName("testContainerCompleted");
    exampleApp.addComponent(createComponent("compa", 1, "pwd"));

    String json = "{\"auths\": "
        + "{\"https://index.docker.io/v1/\": "
        + "{\"auth\": \"foobarbaz\"},"
        + "\"registry.example.com\": "
        + "{\"auth\": \"bazbarfoo\"}}}";
    File dockerTmpDir = new File("target", "docker-tmp");
    FileUtils.deleteQuietly(dockerTmpDir);
    dockerTmpDir.mkdirs();
    String dockerConfig = dockerTmpDir + "/config.json";
    BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
    bw.write(json);
    bw.close();
    Credentials dockerCred =
        DockerClientConfigHandler.readCredentialsFromConfigFile(
            new Path(dockerConfig), conf, applicationId.toString());


    MockServiceAM am = new MockServiceAM(exampleApp, dockerCred);
    ByteBuffer amCredBuffer = am.recordTokensForContainers();
    Credentials amCreds =
        DockerClientConfigHandler.getCredentialsFromTokensByteBuffer(
            amCredBuffer);

    assertEquals(2, amCreds.numberOfTokens());
    for (Token<? extends TokenIdentifier> tk : amCreds.getAllTokens()) {
      Assert.assertTrue(
          tk.getKind().equals(DockerCredentialTokenIdentifier.KIND));
    }

    am.stop();
  }

  @Test
  public void testIPChange() throws TimeoutException,
      InterruptedException {
    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
    String comp1Name = "comp1";
    String comp1InstName = "comp1-0";
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setVersion("v1");
    exampleApp.setName("testIPChange");
    Component comp1 = createComponent(comp1Name, 1, "sleep 60");
    comp1.setArtifact(new Artifact().type(Artifact.TypeEnum.DOCKER));
    exampleApp.addComponent(comp1);

    MockServiceAM am = new MockServiceAM(exampleApp);
    am.init(conf);
    am.start();

    ComponentInstance comp1inst0 = am.getCompInstance(comp1Name, comp1InstName);
    // allocate a container
    am.feedContainerToComp(exampleApp, 1, comp1Name);
    GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus() != null,
        2000, 200000);
    // first host status will match the container nodeId
    Assert.assertEquals("localhost",
        comp1inst0.getContainerStatus().getHost());

    LOG.info("Change the IP and host");
    // change the container status
    am.updateContainerStatus(exampleApp, 1, comp1Name, "new.host");
    GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost()
        .equals("new.host"), 2000, 200000);

    LOG.info("Change the IP and host again");
    // change the container status
    am.updateContainerStatus(exampleApp, 1, comp1Name, "newer.host");
    GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost()
        .equals("newer.host"), 2000, 200000);
    am.stop();
  }

  /**
   This test verifies that the containers are released and the
   component instance is added to the pending queue when building the launch
   context fails.
   Here, we intentionally have an artifact that doesn't have an id.
   This will cause TarballProviderService.processArtifact
   to throw an IllegalArgumentException because the Path object is
   constructed from the id of the artifact.
   In case the id is set to null or unset so it is effectively null,
   Path.checkPathArg throws an IllegalArgumentException.
  **/
  @Test(timeout = 30000)
  public void testContainersReleasedWhenPreLaunchFails()
      throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setVersion("v1");
    exampleApp.setName("testContainersReleasedWhenPreLaunchFails");

    Component compA = createComponent("compa", 1, "pwd");
    Artifact artifact = new Artifact();
    artifact.setType(Artifact.TypeEnum.TARBALL);
    compA.artifact(artifact);
    exampleApp.addComponent(compA);

    MockServiceAM am = new MockServiceAM(exampleApp);
    am.init(conf);
    am.start();

    ContainerId containerId = am.createContainerId(1);

    // allocate a container
    am.feedContainerToComp(exampleApp, containerId, "compa");
    am.waitForContainerToRelease(containerId);
    ComponentInstance compAinst0 = am.getCompInstance(compA.getName(),
        "compa-0");
    GenericTestUtils.waitFor(() ->
        am.getComponent(compA.getName()).getPendingInstances()
        .contains(compAinst0), 2000, 30000);

    Assert.assertEquals(1,
        am.getComponent("compa").getPendingInstances().size());
    am.stop();
  }

  @Test(timeout = 30000)
  public void testSyncSysFS() {
    ApplicationId applicationId = ApplicationId.newInstance(
        System.currentTimeMillis(), 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setVersion("v1");
    exampleApp.setName("tensorflow");

    Component compA = createComponent("compa", 1, "pwd");
    compA.getConfiguration().getEnv().put(
        "YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE", "true");
    Artifact artifact = new Artifact();
    artifact.setType(Artifact.TypeEnum.TARBALL);
    compA.artifact(artifact);
    exampleApp.addComponent(compA);
    try {
      MockServiceAM am = new MockServiceAM(exampleApp);
      am.init(conf);
      am.start();
      ServiceScheduler scheduler = am.context.scheduler;
      scheduler.syncSysFs(exampleApp);
      scheduler.close();
      am.stop();
      am.close();
    } catch (Exception e) {
      LOG.error("Fail to sync sysfs.", e);
      Assert.fail("Fail to sync sysfs.");
    }
  }

  @Test
  public void testScheduleWithResourceAttributes() throws Exception {
    ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
    Service exampleApp = new Service();
    exampleApp.setId(applicationId.toString());
    exampleApp.setName("testScheduleWithResourceAttributes");
    exampleApp.setVersion("v1");

    List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
        ResourceUtils.getResourcesTypeInfo());
    // Add 3rd resource type.
    resourceTypeInfos.add(ResourceTypeInfo
        .newInstance("test-resource", "", ResourceTypes.COUNTABLE));
    // Reinitialize resource types
    ResourceUtils.reinitializeResources(resourceTypeInfos);

    Component serviceCompoent = createComponent("compa", 1, "pwd");
    serviceCompoent.getResource().setResourceInformations(
        ImmutableMap.of("test-resource",
            new ResourceInformation()
                .value(1234L)
                .unit("Gi")
                .attributes(ImmutableMap.of("k1", "v1", "k2", "v2"))));
    exampleApp.addComponent(serviceCompoent);

    MockServiceAM am = new MockServiceAM(exampleApp);
    am.init(conf);
    am.start();

    ServiceScheduler serviceScheduler = am.context.scheduler;
    AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync =
        serviceScheduler.getAmRMClient();

    Collection<AMRMClient.ContainerRequest> rr =
        amrmClientAsync.getMatchingRequests(0);
    Assert.assertEquals(1, rr.size());

    org.apache.hadoop.yarn.api.records.Resource capability =
        rr.iterator().next().getCapability();
    Assert.assertEquals(1234L, capability.getResourceValue("test-resource"));
    Assert.assertEquals("Gi",
        capability.getResourceInformation("test-resource").getUnits());
    Assert.assertEquals(2, capability.getResourceInformation("test-resource")
        .getAttributes().size());
    am.stop();
  }
}
